Introduce a max_read_time

Daniel O'Connor 10 years ago
parent
commit
d841b3e4b5
1 changed files with 29 additions and 13 deletions
  1. 29 13
      app/models/agents/mqtt_agent.rb

+ 29 - 13
app/models/agents/mqtt_agent.rb

@@ -41,14 +41,14 @@ module Agents
41 41
       temperature and other events are being published.
42 42
 
43 43
       <pre><code>{
44
-        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
44
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
45 45
         'topic' => 'the_thing_system/demo'
46 46
       }
47 47
       </code></pre>
48 48
 
49 49
       Subscribe to all topics
50 50
       <pre><code>{
51
-        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
51
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
52 52
         'topic' => '/#'
53 53
       }
54 54
       </code></pre>
@@ -81,24 +81,25 @@ module Agents
81 81
       {
82 82
         'uri' => 'mqtts://user:pass@locahost:8883',
83 83
         'ssl' => :TLSv1,
84
-        'ca_file' => './ca.pem',
84
+        'ca_file'  => './ca.pem',
85 85
         'cert_file' => './client.crt',
86 86
         'key_file' => './client.key',
87
-        'topic' => 'huginn'
87
+        'topic' => 'huginn',
88
+        'max_read_time' => '10'
88 89
       }
89 90
     end
90 91
 
91 92
     def mqtt_client
92
-      client = MQTT::Client.new(options['uri'])
93
+      @client ||= MQTT::Client.new(options['uri'])
93 94
 
94 95
       if options['ssl']
95
-        client.ssl = options['ssl'].to_sym
96
-        client.ca_file = options['ca_file']
97
-        client.cert_file = options['cert_file']
98
-        client.key_file = options['key_file']
96
+        @client.ssl = options['ssl'].to_sym
97
+        @client.ca_file = options['ca_file']
98
+        @client.cert_file = options['cert_file']
99
+        @client.key_file = options['key_file']
99 100
       end
100 101
 
101
-      client
102
+      @client
102 103
     end
103 104
 
104 105
     def receive(incoming_events)
@@ -106,15 +107,30 @@ module Agents
106 107
         incoming_events.each do |event|
107 108
           c.publish(options['topic'], payload)
108 109
         end
110
+
111
+        c.disconnect
109 112
       end
110 113
     end
111 114
 
112 115
 
113 116
     def check
114 117
       mqtt_client.connect do |c|
115
-        c.get(options['topic']) do |topic,message|
116
-          create_event :payload => { 'topic' => topic, 'message' => JSON.parse(message), 'time' => Time.now.to_i }
117
-        end
118
+
119
+        Timeout::timeout(options['max_read_time']) {        
120
+          c.get(options['topic']) do |topic, message|
121
+
122
+            # A lot of services generate JSON. Try that first
123
+            payload = JSON.parse(message) rescue message
124
+
125
+            create_event :payload => { 
126
+              'topic' => topic, 
127
+              'message' => payload, 
128
+              'time' => Time.now.to_i 
129
+            }
130
+          end
131
+        } rescue TimeoutError
132
+
133
+        c.disconnect   
118 134
       end
119 135
     end
120 136